package org.example.listener

import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerTaskEnd}
import org.example.common.Logging
import org.example.constant.ApolloConst
import org.example.utils.KafkaUtil

/**
  * 监控spark程序的启动与停止
  */
class LifecycleListener(conf: SparkConf) extends SparkListener with Logging {

  //val msg = new ArrayBuffer[String]()
  private val message = new JSONObject()
  message.put("errorMessage", "")
  message.put("warnMessage", "")
  message.put("isError", false)

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    message.put("applicationId", applicationStart.appId.getOrElse(""))
    message.put("applicationName", applicationStart.appName)
    message.put("startTime", applicationStart.time)
  }

  /** 当整个Application结束时调用的回调函数 */
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    message.put("endTime", applicationEnd.time)
  }

  /** task 结束时回调 */
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    val info = taskEnd.taskInfo
    if (info != null && taskEnd.stageAttemptId != -1) {
      val errorMessage: Option[String] = taskEnd.reason match {
        case e: ExceptionFailure => Some(e.toErrorString)
        case e: TaskFailedReason => Some(e.toErrorString)
        case kill: TaskKilled => Some(kill.toErrorString)
        case _ => None
      }
      if (errorMessage.nonEmpty) {
        message.put("isError", true)
        message.put("errorMessage", errorMessage.get)
      }
      if (message.getBoolean("isError")) {
        val producer: KafkaProducer[String, String] = KafkaUtil.getKafkaProducer(ApolloConst.bootstrap)
        val topic: String = "sparkWarn"
        producer.send(new ProducerRecord(topic, message.toJSONString))
        producer.close()
      }
    }
  }
}
